理解 golang channel

关键词:go, channel

goroutine 是一个轻量级的执行单元,相比线程开销更小,完全由 Go 语言负责调度,是 Go 支持并发的核心。开启一个 goroutine 非常简单:

1
go func() {}() 即可

不要通过共享来通信,而要通过通信来共享。

goroutine 运行在相同的地址空间,因此访问共享内存必须做好同步,golang是通过channel来实现的通信的。

unbufered channel

Channel可以分为两种类型,一种是buffer为0的,一种是buffer不为零的。

1
2
3
4
ch := make(chan int)
// 这种就是buffer为零的channel的定义方式
ch := make(chan int, 10)
// 这种就是buffer不为零的定义方式

channelbuffer为零时,由于没有缓存,因此,可以非常容易做到Goroutines的同步.

  • send语句用来往Channel中发送数据, 如ch <- 3
  • receive 语句 <-ch用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收。

unbuffered channel ch进行读操作v := <- ch时(即进行receive时),会一直阻塞,直到有数据sendch中(即当有ch <- value)时才会发生通讯。

即:如果没有设置容量,或者容量设置为0, 说明Channel没有缓存,只有senderreceiver都准备好了后它们的通讯才会发生, 实现goroutine 间的同步(main 函数自身就是一个 goroutine

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import (
"fmt"
"time"
)
// this test is about unbuffered channel, it contains no element in chan
//这是 channel 最简单的用法之一:同步,这种类型的 channel 容量是 0,称之为 unbuffered channel。
// *********** 下面一句很重要 ************
//对 unbuffered channel 执行读操作 value := <-ch 会一直阻塞直到有数据可接收, 即直到ch里有数据
//执行写操作 ch <- value 也会一直阻塞直到有 goroutine 对 channel 开始执行接收, 二者必须成对出现
//无缓存的channel只有在receiver准备好后send才被执行。如果有缓存,并且缓存未满,则send会被执行。
//<-ch用来从channel ch中接收数据,这个表达式会一直被block,直到有数据可以接收
func main() {
c := make(chan int)
go func() {
fmt.Println("goroutine message")
time.Sleep(time.Second)
c <- 1 //1
fmt.Println("goroutine message 2")
time.Sleep(time.Second)
fmt.Println("goroutine message 3")
}()
<-c //2
fmt.Println("main function message")
}
------
goroutine message
goroutine message 2
main function message

这个示例代码有两个goroutine, 其中一个是main函数的,一个是go出来的函数,通过返回的结果可以看出,main携程进行到<- c时就阻塞了,因为此时c里时空的,没有向c``中发送数据,转而去执行另一个goroutine, 所以会打印出“goroutine message”,之后进行了一次sleep,交出了这个goroutine的控制权,回到main这个goroutine中,发现c还是没有接受到数据,因此这个maingoroutine依旧阻塞着,过了一秒后,控制权重新回到另一个goroutine, 此时发现有数据写入c了,二者发生了通信,实现了同步,因此打印出了"goroutine message 2""main function message", 由于此时main这个goroutine已经结束了,因此整个程序也结束了,所以另一个goroutine的后续操作就没有继续执行。

buffered channel

如果 channel 的容量不是 0,此类 channel 称之为 buffered channel ,buffered channel 在消息写入个数 未达到容量的上限之前不会阻塞 ,一旦写入消息个数超过上限,下次输入将会阻塞,直到 channel 有位置可以再写入。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main
import (
"fmt"
"strconv"
"time"
)
func dealSlice(c *chan int, num *int, s *[]string) {
for i := 0; i < 4; i++ {
*c <- i
*num++
v := "inner=>" + strconv.Itoa(*num)
*s = append(*s, v)
}
close(*c)
}
func main() {
c := make(chan int, 3)
s := make([]string, 8)
num := 0
go dealSlice(&c, &num, &s)
for i := 0; i < 4; i++ {
time.Sleep(time.Second)
<-c
num++
v := "outer=>" + strconv.Itoa(num)
s = append(s, v)
}
fmt.Println(s)
}
------
[inner=>1 inner=>2 inner=>3 outer=>4 inner=>5 outer=>6 outer=>7 outer=>8]

dealSlice go出去后,由于c的容量是3,因此在c达到3的容量前,是不会阻塞的,此时另一个maingoroutine正在Sleep,一旦当c的容量到了3之后,这个goroutine就阻塞了,直到mainsleep走完,并且完成从c中取出一个数据,一旦从c中取出一个数据,这个时候c的大小就小于3了,此时不再阻塞,于是dealSlice这个goroutine 又开始运行(但是由于两个goroutine都没有阻塞了此时,所以二者谁先append进去是不确定的)但是一旦dealSlice完成后,就使用close将这个c关闭了,此时无法再对c写入任何的数据了,一旦再有数据写入就会引发panic,但是依旧可以从c中读取数据,直到读取完毕,因此,会看到如输出的结果一样的东西。这就是buggered channel的作用,有buffer`,所以在达到容量上限前是不会阻塞的。

select — 使用 select 读取多个 channel

当同时又多个channel时,select 语句可以从多个可读的 channel 中随机选取一个执行,注意是 随机选取。但是这个随机选取是有条件的,这个channel是不阻塞的,如果channel阻塞,他会随机选择其他的不阻塞的一个。

我们上面介绍的都是只有一个channel的情况,那么如果存在多个channel的时候,我们该如何操作呢,Go里面提供了一个关键字select,通过select可以监听channel上的数据流动。

select默认是阻塞的,只有当监听的channel中有发送或接收可以进行时才会运行,当多个channel都准备好的时候,select是随机的选择一个执行的。

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
------
0
1
1
2
3
5
8
13
21
34
quit

根据代码和输出的情况来看,新起的goroutine一开始就会阻塞,因为c里没有数据可以取出,因此,阻塞在这里等待其他goroutine里有数据输入到c中,而fibonacci所在的main goroutineselect中选择case,两个随机选择,但是<-quit阻塞,因此,只能选择不阻塞的c <- x, 这样两个goroutine``就都可以进行下去,直到func这个 goroutinefor循环结束,此时fibonacci那里的数据已经被func全部取完了,fibonacci又阻塞了,func这时走到了quit <- 0,阻塞到这里,等待quit有人接收,正好此时select发现case2正好可以实现同步,func这个goroutine先结束,主程序maingoroutine后结束(因为func只有一个channel接收数据,而main这个goroutine 除了和其对接的 <- quit 还有一个打印操作,所以,最后结束点落到main上,让程序正好OK,如果funcquit<-0 后,还有一些其他的阻塞操作,那么,可能就无法执行,因为此时主程序的goroutine 已经结束。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"fmt"
"time"
)
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
fmt.Println("这个打印还是可以显示的")
time.Sleep(time.Second)
fmt.Println("这个打印应该不会显示了")
}()
fibonacci(c, quit)
}
------
0
1
1
2
3
5
8
13
21
34
这个打印还是可以显示的
quit

正如我们预测的那样。

select里面还有default语法,select其实就是类似switch的功能,default就是当监听的channel都没有准备好的时候,默认执行的(select不再阻塞等待channel)。

1
2
3
4
5
6
select {
case <-ch:
// 当 ch 不阻塞的时候执行这里
default:
// 当 ch 阻塞的时候执行这里
}

timeout 超时的处理

有时候可能会出现goroutine超时的情况,这个时候我们可以使用 select 设置超时的处理,一旦超时超过一段时间,我们就使用 select 选择处理的case,保证程序的正常进行。代码实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
c := make(chan int)
o := make(chan bool)
go func() {
for {
select {
case v := <- c:
println(v)
case <- time.After(5 * time.Second):
println("timeout")
o <- true
break
}
}
}()
<- o
}

我们可以看到此时一共有两个goroutine, 一个是main 的,一个是go 出的func的,func是一个无限循环,由于两个case在5秒内都不会有任何反应,因此会在select这里阻塞5秒,而main中是从o这个channel中取数据,没有数据的流入因此也阻塞到这里,此时超时就发挥作用了,这个time.After()的返回值是一个channel,当5秒过去后,这个case不阻塞了,于是向下执行,先是打印出 timeout这个字符,然后 向o这个channel写入数据,与此同时 <-o这个阻塞也解除,func跳出循环结束函数,main结束此goroutine

close 关闭 channel

Channel 可以被关闭 closechannel 关闭之后仍然可以读取,但是向被关闭的 channel send 会 panic。如果 channel 关闭之前有值写入,关闭之后将依次读取 channel 中的消息,读完完毕之后再次读取将会返回 channel 的类型的 zero value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
import (
"fmt"
)
func main() {
c := make(chan int, 3)
go func() {
c <- 1
c <- 2
c <- 3
close(c)
}()
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
fmt.Println(<-c)
}

输出 1 2 3 0 0 0 ,0 是 int channel c 的 zero value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main
import (
"fmt"
)
func main() {
c := make(chan int, 3)
go func() {
c <- 1
c <- 2
c <- 3
close(c)
}()
for i := range c {
fmt.Println(i)
}
}

c 可以进行 range 迭代,如果 channel 没有被关闭 range 会一直等待 channel,但是关闭 channel 之后可以隐式的中断 range 的迭代。

如何判断channel的关闭与否呢?

Go 提供了 ok 表达式来判断 channel 的关闭状态。

1
value, ok <- c

如果 channel 是关闭状态,ok 是 false,value 是 channel 的 zero value,否则 ok 是 true 表示 channel 未关闭,value 表示 channel 中的值。


参考文档